#include "config.h"

#include <sys/mman.h>
#include <libaio.h>
#include <errno.h>
#include <ctype.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "replica.h"
#include "clock_mem.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "clock_merge.h"
#include "squeue.h"
#include "disk.h"
#include "net_global.h"
#include "dbg.h"

typedef struct {
        uint32_t len;
        uint32_t crc;
        chkid_t id;
        vclock_t vclock;
} clock_disk_t;

typedef struct {
        int fd;
        int buflen;
        int offset;
        char *buf;
} iterator_ctx_t;

#define MD_MERGE_ALIGN (1024 * 1024 * 4)
#define MD_MERGE_FILE "merge"
#define MD_MERGE_TMP "merge.tmp"
#define MD_MERGE_JOURNAL "merge.journal"

static int __clock_merge_load1(const char *buf, int buflen, clock_mem_t *md, int offset)
{
        int ret, left;
        uint32_t crc;
        const clock_disk_t *head;

        left = buflen;
        head = (void *)buf;

        while (left > 0) {
                YASSERT(left >= (int)head->len);
                YASSERT(head->len >= (int)sizeof(*head));
                crc = crc32_sum((void *)head + sizeof(uint32_t) * 2, head->len - sizeof(uint32_t) * 2);
                if (crc != head->crc) {
                        ret = EIO;
                        GOTO(err_ret, ret);
                }

                ret = clock_mem_add(md, &head->id, &head->vclock);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                left -= head->len;
                if (left == 0)
                        break;

                head = (void *)head + head->len;
                if (head->len == 0) {
                        DBUG("skip [%u, %u]\n", offset + buflen - left,
                             offset + buflen);
                        break;
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __clock_merge_load(const char *home, int idx, clock_mem_t *md)
{
        int ret, fd, buflen, offset, left;
        char path[MAX_PATH_LEN], *buf;//[MD_MERGE_ALIGN];
        struct stat stbuf;

        //YASSERT(__clock_merge.inited);

        snprintf(path, MAX_LINE_LEN, "%s/%d/merge", home, idx);

        DBUG("merge %s\n", path);
        
        fd = open(path, O_RDONLY);
        if (fd < 0) {
                ret = errno;
                if (ret == ENOENT) {
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        ret = stat(path, &stbuf);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        ret = ymalloc((void **)&buf, MD_MERGE_ALIGN);
        if (unlikely(ret))
                GOTO(err_fd, ret);
        
        offset = 0;
        left = stbuf.st_size;
        while (left > 0) {
                ret = _pread(fd, buf, MD_MERGE_ALIGN, offset);
                if (ret < 0) {
                        ret = -ret;
                        GOTO(err_free, ret);
                }

                buflen = ret;

                ret = __clock_merge_load1(buf, buflen, md, offset);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                offset += buflen;
                left -= buflen;
        }

        yfree((void **)&buf);
        close(fd);
out:
        return 0;
err_free:
        yfree((void **)&buf);
err_fd:
        close(fd);
err_ret:
        return ret;
}

static int __clock_merge_iterator(void *_ctx, void *_ent)
{
        int ret, len;
        iterator_ctx_t *ctx;
        clock_disk_t *head;
        clock_entry_t *ent;

        ctx = _ctx;
        ent = _ent;

        len = sizeof(*head);
        head = (void *)ctx->buf + ctx->buflen;
        if (ctx->buflen + len > MD_MERGE_ALIGN) {
                YASSERT(ctx->buflen <= MD_MERGE_ALIGN);
                if (ctx->buflen != MD_MERGE_ALIGN) {
                        DBUG("skip [%u, %u], len %u\n", ctx->offset + ctx->buflen,
                              ctx->offset + MD_MERGE_ALIGN, len);
                        memset(head, 0x0, MD_MERGE_ALIGN - ctx->buflen);
                }

                ret = _pwrite(ctx->fd, ctx->buf, MD_MERGE_ALIGN, ctx->offset);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }

                YASSERT(ret == MD_MERGE_ALIGN);

                ctx->buflen = 0;
                ctx->offset += MD_MERGE_ALIGN;
        }

        head = (void *)ctx->buf + ctx->buflen;
        head->id = ent->id;
        head->vclock = ent->vclock;
        head->len = len;
        head->crc = crc32_sum((void *)head + sizeof(uint32_t) * 2, head->len - sizeof(uint32_t) * 2);

        ctx->buflen += head->len;

        YASSERT(ctx->buflen <= MD_MERGE_ALIGN);

        return 0;
}

static int __clock_merge_save__(const char *path, clock_mem_t *md)
{
        int ret, fd;
        char *buf;
        iterator_ctx_t ctx;

        ret = ymalloc((void **)&buf, MD_MERGE_ALIGN);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        fd = _open(path, O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0644);
        if (fd < 0) {
                ret = -fd;
                DERROR("%s exist\n", path);
                GOTO(err_free, ret);
        }

        ctx.fd = fd;
        ctx.buflen = 0;
        ctx.offset = 0;
        ctx.buf = buf;

        ret = clock_mem_iterator(md, __clock_merge_iterator, &ctx);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        if (ctx.buflen) {
                DINFO("last [%u, %u]\n", ctx.offset,
                      ctx.offset + ctx.buflen);
                ret = _pwrite(ctx.fd, ctx.buf, ctx.buflen, ctx.offset);
                if (ret < 0) {
                        ret = -ret;
                        UNIMPLEMENTED(__DUMP__);
                }
        }

        yfree((void **)&buf);
        //fsync(fd);
        close(fd);

        return 0;
err_fd:
        close(fd);
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

static int __clock_play_journal(const char *home, int idx, uint64_t from, uint64_t to,
                                const char *_from, const char *_to)
{
        int ret;
        char path[MAX_PATH_LEN];//, newpath[MAX_PATH_LEN];
        struct stat stbuf;
        uint64_t i;

        for (i = from; i <= to; i++) {
                snprintf(path, MAX_LINE_LEN, "%s/%d/%llu", home, idx, (LLU)i);
                //snprintf(newpath, MAX_LINE_LEN, "%s/backup/%llu", __clock_merge.path, (LLU)i);

                //rename(path, newpath);
                unlink(path);
        }

        ret = stat(_from, &stbuf);
        if (ret < 0) {
                ret = errno;
                if (ret == ENOENT) {
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        unlink(_to);
        rename(_from, _to);

        schedule_stack_assert(NULL);
        
out:
        return 0;
err_ret:
        return ret;
}

static int __clock_merge_save(const char *data, int idx, uint64_t from,
                              uint64_t to, clock_mem_t *md)
{
        int ret;
        char buf[MAX_BUF_LEN], journal[MAX_PATH_LEN], path[MAX_PATH_LEN],
                tmp[MAX_PATH_LEN];
        
        snprintf(tmp, MAX_LINE_LEN, "%s/%d/%s", data, idx, MD_MERGE_TMP);

        ret = path_validate(tmp, YLIB_NOTDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __clock_merge_save__(tmp, md);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(buf, MAX_LINE_LEN, "[%llu, %llu]", (LLU)from, (LLU)to);
        snprintf(journal, MAX_LINE_LEN, "%s/%d/%s", data, idx, MD_MERGE_JOURNAL);
        ret = _set_text(journal, buf, strlen(buf), O_CREAT | O_TRUNC | O_SYNC);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_LINE_LEN, "%s/%d/%s", data, idx, MD_MERGE_FILE);
        ret = __clock_play_journal(data, idx, from, to, tmp, path);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        unlink(journal);

        return 0;
err_ret:
        return ret;
}

int clock_merge(const char *log, const char *data, int idx, uint64_t from,
                uint64_t to, clock_mem_t **_merge)
{
        int ret, size;
        char tmp[MAX_PATH_LEN];
        clock_mem_t *md;
        uint64_t i;

        DINFO("merge %s %d [%llu, %llu] to %s\n", log, idx, (LLU)from, (LLU)to, data);

        //YASSERT(__clock_merge.inited);

        if (*_merge == NULL) {
                ret = clock_mem_init(&md, "chunk_clock_merge", 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __clock_merge_load(data, idx, md);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                md = *_merge;
        }

        for (i = from; i <= to; i++) {
                snprintf(tmp, MAX_LINE_LEN, "%s/%d/%llu", log, idx, (LLU)i);
                size = MD_MAX;

                ret = clock_load(tmp, size, md);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ret = __clock_merge_save(data, idx, from, to, md);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
#if 0
        clock_mem_destroy(md);
#endif
        
        *_merge = md;
        
        return 0;
err_ret:
        return ret;
}

static int __clock_merge_try_journal(const char *home, int idx)
{
        int ret;
        char buf[MAX_BUF_LEN], journal[MAX_PATH_LEN], path[MAX_PATH_LEN],
                tmp[MAX_PATH_LEN];
        struct stat stbuf;
        uint64_t from, to;

        snprintf(tmp, MAX_LINE_LEN, "%s/%d/%s", home, idx, MD_MERGE_TMP);
        snprintf(path, MAX_LINE_LEN, "%s/%d/%s", home, idx, MD_MERGE_FILE);
        snprintf(journal, MAX_LINE_LEN, "%s/%d/%s", home, idx, MD_MERGE_JOURNAL);

        ret = stat(journal, &stbuf);
        if (unlikely(ret)) {
                ret = errno;
                if (ret == ENOENT) {
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

        ret = _get_text(journal, buf, MAX_PATH_LEN);
        if (ret < 0) {
                ret = -ret;
                GOTO(err_ret, ret);
        }

        ret = sscanf(buf, "[%lu, %lu]", &from, &to);
        if (ret != 2) {
                ret = EIO;
                GOTO(err_ret, ret);
        }

        ret = __clock_play_journal(home, idx, from, to, tmp, path);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        unlink(journal);

out: 
        unlink(tmp);

        return 0;
err_ret:
        return ret;
}

int clock_merge_init(const char *home, clock_mem_t *md)
{
        int ret, i;

        for (i = 0; i < CLOCK_HASH; i++) {
                ret = __clock_merge_try_journal(home, i);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __clock_merge_load(home, i, md);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}
